package controller

import (
	"context"

	"gitee.com/hexug/devcloud/maudit/apps"
	"gitee.com/hexug/devcloud/maudit/apps/audit"
	"gitee.com/hexug/devcloud/maudit/common/logger"
	"github.com/segmentio/kafka-go"
)

// 审计日志保存
type AuduitLogSaveConroller struct {
	brokerAddress []string
	groupId       string
	topic         string

	reader *kafka.Reader
	audit  audit.Server
}

func NewAuduitLogSaveConroller(broker []string, groupID, topic string) *AuduitLogSaveConroller {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers: broker,
		// Consumer Groups, 不指定就是普通的一个Consumer
		GroupID: groupID,
		// 可以指定Partition消费消息
		// Partition: 0,
		Topic:    topic,
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})
	return &AuduitLogSaveConroller{
		brokerAddress: broker,
		groupId:       groupID,
		topic:         topic,
		reader:        r,
		audit:         apps.GetApp(audit.AppName).(audit.Server),
	}
}

func (a *AuduitLogSaveConroller) Close() error {
	return a.reader.Close()
}

func (a *AuduitLogSaveConroller) Run(ctx context.Context) {
	for {
		m, err := a.reader.ReadMessage(ctx)
		if err != nil {
			break
		}
		logger.L().Debugf("message at topic/partition/offset %v/%v/%v: %s\n", m.Topic, m.Partition, m.Offset, string(m.Value))
		ad := audit.LoadAuditLogJsonFromByte(m.Value)
		if ad == nil {
			logger.L().Error("转换kafka消息出错")
			continue
		}
		_, err = a.audit.SaveAuditLog(ctx, ad)
		if err != nil {
			logger.L().Errorf("保存审计日志信息出错: %s", err.Error())
		}
		// 处理完消息后需要提交该消息已经消费完成, 消费者挂掉后保存消息消费的状态
		if err := a.reader.CommitMessages(context.Background(), m); err != nil {
			logger.L().Errorf("提交消息出错: %s", err.Error())
			continue
		}
	}
}
